package api.transform;

import api.beans.SensorReading;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * reduce聚合
 */
public class TransformTest3 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> inputStream = env.readTextFile("D:\\IdeaProjects\\springboot-flink-1\\flinkTutorial\\src\\main\\resources\\sensor.txt");

        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0],new Long(fields[1]), new Double(fields[2]));
        });

        //分组
        KeyedStream<SensorReading, String> keyedStream = dataStream.keyBy(SensorReading::getId);
        //reduce聚合，取最大温度值，以及当前的最新的时间戳
//        keyedStream.reduce(new ReduceFunction<SensorReading>() {
//            @Override
//            public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
//                return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(),value2.getTemperature()));
//            }
//        });
        SingleOutputStreamOperator<SensorReading> resultStream = keyedStream.reduce((value1, value2) -> {
            return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));
        });
        resultStream.print("reduce:");
        env.execute();
    }
}
